/*
 *    Copyright © OpenAtom Foundation.
 *
 *    Licensed under the Apache License, Version 2.0 (the "License");
 *    you may not use this file except in compliance with the License.
 *    You may obtain a copy of the License at
 *
 *         http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS,
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *    See the License for the specific language governing permissions and
 *    limitations under the License.
 */

package com.inspur.edp.bef.core.session.distributed.dac;

import com.inspur.edp.bef.core.session.CorruptedSessionException;
import com.inspur.edp.bef.core.session.FuncSession;
import com.inspur.edp.bef.core.session.FuncSession.SessionItemMap;
import com.inspur.edp.bef.core.session.FuncSessionUtil;
import com.inspur.edp.bef.core.session.distributed.close.AbnormalClosingUtil;
import com.inspur.edp.bef.core.session.distributed.common.DistributedLockWrapper;
import com.inspur.edp.bef.core.session.distributed.common.NoLock;
import com.inspur.edp.bef.core.session.distributed.common.SessionLockProvider;
import com.inspur.edp.cef.api.session.ICefSessionItem;
import com.inspur.edp.cef.entity.changeset.Tuple;
import com.inspur.edp.commonmodel.core.session.SessionItemRecover;
import com.inspur.edp.commonmodel.core.session.distributed.SessionItemDistributedFactory;
import com.inspur.edp.commonmodel.core.session.distributed.dac.FuncSessionIncrement;
import com.inspur.edp.commonmodel.core.session.distributed.dac.FuncSessionItemIncrement;
import com.inspur.edp.commonmodel.core.session.serviceinterface.SessionConfigService;
import com.inspur.edp.commonmodel.core.session.tableentity.SessionCacheInfo;
import com.inspur.edp.commonmodel.core.session.tableentity.SessionConfig;
import com.inspur.edp.commonmodel.core.util.SessionIncrementUtil;
import io.iec.edp.caf.boot.context.CAFContext;
import io.iec.edp.caf.commons.utils.SpringBeanUtils;
import io.iec.edp.caf.core.context.BizContextManager;
import io.iec.edp.caf.core.context.ICAFContextService;
import io.iec.edp.caf.core.session.CafSession;
import io.iec.edp.caf.core.session.ICafSessionService;
import io.iec.edp.caf.core.session.SessionType;
import io.iec.edp.caf.core.session.core.CAFSessionThreadHolder;
import io.iec.edp.caf.msu.api.ServiceUnitAwareService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.locks.Lock;
import lombok.extern.slf4j.Slf4j;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.util.StringUtils;

public class FuncSessionDbDac implements FuncSessionDac {

  private static FuncSessionDbDac instance = new FuncSessionDbDac();

  public static FuncSessionDbDac getInstance() {
    return instance;
  }

  private FuncSessionDbDac() {
    FuncSessionUtil.startScheduler(ExpiredCleaner.class, "DBExpiredCleaner", 10,1);
  }

  @Override
  public Lock getFuncInstanceLock(String funcInstId) {
    if (StringUtils.isEmpty(funcInstId)) {
      return NoLock.getInstance();
    }
    return new DistributedLockWrapper("bef-func-lock:".concat(funcInstId));
  }

  @Override
  public Lock getSessionLock(String session) {
    return SessionLockProvider.getDistLock(session);
//    return new DistributedLockWrapper("bef-session-lock:".concat(session));
  }

  @Override
  public Lock getLocalSessionLock(String session) {
    return SessionLockProvider.getNonDistLock(session);
  }

  @Override
  public boolean isLatest(String sessionId, int version) {
    return getUtil().isLatestVersion(sessionId, version);
  }

  public String getSessionListByToken(String token) {
    return AbnormalClosingUtil.getInstance().getSessionId(token, CAFContext.current.getCurrentSU());
  }

  @Override
  public void addSessionWithToken(String userSessionId, String token, String sessionId) {
    AbnormalClosingUtil.getInstance().setSessionRelationCacheInBiz(token, sessionId,
        CAFContext.current.getCurrentSU());
  }

  @Override
  public void removeSessionWithToken(String userSessionId, String token, String sessionId){
    AbnormalClosingUtil.getInstance().removeSessionRelationCacheInBiz(token, sessionId,
        CAFContext.current.getCurrentSU());
  }

  @Override
  public void removeSession(String sessionId) {
    SessionLockProvider.remove(sessionId);
    getUtil().clearSessionCache(sessionId);
  }

  @Override
  public FuncSession recover(String id, FuncSession existing) {
    List<SessionCacheInfo> caches = getUtil()
        .getSessionCacheInfos(id, existing == null ? -1 : existing.getVersion());
    if (caches == null || caches.isEmpty()) {
      return existing;
    }

    Map<String, SessionItemDistributedFactory> recoverFactoryMap = new LinkedHashMap<>();
    List<SessionItemDistributedFactory> recoverFactoryList = new ArrayList<>();
    FuncSession rez;
    if (existing == null) {
      rez = new FuncSession(null, true);
      rez.setSessionId(id);
    } else {
      for (Entry<String, ICefSessionItem> pair : existing.getSessionItems().entrySet()) {
        SessionItemDistributedFactory recover = SpringBeanUtils
            .getBean(pair.getValue().getCategory(), SessionItemDistributedFactory.class);
        recoverFactoryMap.putIfAbsent(pair.getValue().getCategory(), recover);
        recoverFactoryList.add(recover);
      }
      rez = existing;
    }

    List<Integer> lastResetIndex = buildResetIndex(rez, caches);
    for (int i = 0; i < caches.size(); i++) {
      FuncSessionIncrement inc = caches.get(i).getSessionChangeSet();
      if (inc == null) {
        continue;
      }
      if ("true".equals(inc.getCustoms().get("invalid"))){
        throw new CorruptedSessionException(new Exception());
      }
      String contextParam = inc.getCustoms().get("befcontextparam");
      if(contextParam != null) {
        rez.getBefContext().getParams().clear();
        if(!contextParam.equals("")) {
          rez.getBefContext().getParams().putAll(FuncSessionUtil.convertFromBinary(contextParam));
        }
      }
      if (inc.getItemTypes() != null && !inc.getItemTypes().isEmpty()) {
        for (Tuple<String, String> pair : inc.getItemTypes()) {
          SessionItemDistributedFactory recoverFactory = recoverFactoryMap.get(pair.getItem1());
          if (recoverFactory == null) {
            recoverFactoryMap.put(pair.getItem1(), recoverFactory = SpringBeanUtils
                .getBean(pair.getItem1(), SessionItemDistributedFactory.class));
          }
          recoverFactoryList.add(recoverFactory);
          ICefSessionItem sessionItem = (recoverFactory.getRecover()
              .buildSessionItem(rez, pair.getItem2()));
          ((SessionItemMap) rez.getSessionItems()).innerPut(sessionItem.getConfigId(), sessionItem);
        }
      }

      if (inc.getItem() != null && !inc.getItem().isEmpty()) {
        for (FuncSessionItemIncrement itemInc : inc.getItem()) {
          ICefSessionItem sessionItem = ((SessionItemMap) rez.getSessionItems())
              .itemAt(itemInc.getIndex());
          SessionItemRecover recover = recoverFactoryList.get(itemInc.getIndex()).getRecover();
          if (i == lastResetIndex.get(itemInc.getIndex())) {
            recover.reset(sessionItem);
          }
          if (i >= lastResetIndex.get(itemInc.getIndex())) {
            recover.incrementAfterReset(sessionItem, itemInc);
          } else {
            recover.incrementBeforeReset(sessionItem, itemInc);
          }
        }
      }
    }
    rez.setVersion(caches.get(caches.size() - 1).getVersion());
    return rez;
  }

  private static List<Integer> buildResetIndex(FuncSession session, List<SessionCacheInfo> caches) {
    ArrayList<Integer> lastResetIndex = new ArrayList<>(session.getSessionItems().values().size());
    Collections.fill(lastResetIndex, -1);
    int reset = -1;
    for (int i = 0; i < caches.size(); i++) {
      FuncSessionIncrement change = caches.get(i).getSessionChangeSet();
      if (change == null) {
        continue;
      }
      if (change.isReset()) {
        reset = i;
        Collections.fill(lastResetIndex, reset);
      }
      if (change.getItem() != null) {
        for (FuncSessionItemIncrement inc : change.getItem()) {
          if (inc.getIndex() >= lastResetIndex.size()) {
            for (int j = lastResetIndex.size(); j <= inc.getIndex(); ++j) {
              lastResetIndex.add(reset);
            }
          }
          if (inc.isReset()) {
            lastResetIndex.set(inc.getIndex(), i);
          }
        }
      }
    }
    return lastResetIndex;
  }

  @Override
  public void update(FuncSession session, FuncSessionIncrement value) {
    //TODO: 重试
    SessionCacheInfo cacheInfo = new SessionCacheInfo(session.getSessionId(), session.getVersion());
    cacheInfo.setSessionChangeSet(value);
//    cacheInfo.setSessionId(session.getSessionId());
//    cacheInfo.setVersion(session.getVersion());
//    session.getSessionItems().get("").
    getUtil().saveSessionCache(cacheInfo, session);
  }

  //  private SessionIncrementUtil util;
  public SessionIncrementUtil getUtil() {
    return new SessionIncrementUtil();
//    if (util == null) {
//      util = new SessionIncrementUtil();
//    }
//    return util;
  }

  //region schedule job@
  @Slf4j
  @DisallowConcurrentExecution
  public static class ExpiredCleaner implements Job {

    private static HashMap<String, ArrayList<Date>> dates = new HashMap<>();
    private static final Date MinDate = new Date(0);
    //<TenantId, Sulist>
    private static HashMap<String, List<String>> linkInfoRecord;

    @Override
    public void execute(JobExecutionContext arg0) throws JobExecutionException {
      try {
        traversalCleanBySU();
      } catch (Throwable t) {
        log.error("ExpiredCleaner定时任务执行失败, 如果此错误短时间内频繁出现, 请联系Bef组处理", t);
      }
    }

    private static synchronized void traversalCleanBySU() {
      if (linkInfoRecord == null) {
        SessionConfigService service = SpringBeanUtils.getBean(SessionConfigService.class);
        List<SessionConfig> configs = service.getAllConfig();
        List<String> deployedSu = SpringBeanUtils.getBean(ServiceUnitAwareService.class)
            .getEnabledServiceUnits();
        if (deployedSu == null || deployedSu.isEmpty()) {
          log.error("获取已启用Msu列表为空");
          return;
        }

        List<String> specialTenantId = new ArrayList<>();
        HashMap<String, List<String>> record = new HashMap<>();
        for (SessionConfig config : configs) {
          if (!config.getConnectInfo().isUseCache() ||
              !deployedSu.contains(config.getBelongSU()) && !"*".equals(config.getBelongSU()) ||
              specialTenantId.contains(config.getBelongTenant())) {
            continue;
          }
          if ("*".equals(config.getBelongSU())) {
            record.put(config.getBelongTenant(), deployedSu);
            specialTenantId.add(config.getBelongTenant());
            continue;
          }
          if (!record.containsKey(config.getBelongTenant())) {
            ArrayList<String> suCodes = new ArrayList<>();
            suCodes.add(config.getBelongSU());
            record.put(config.getBelongTenant(), suCodes);
          } else {
            List<String> tem = record.get(config.getBelongTenant());
            tem.add(config.getBelongSU());
            record.replace(config.getBelongTenant(), tem);
          }
        }
        linkInfoRecord = record;
      }

      ICafSessionService sessionService = SpringBeanUtils.getBean(ICafSessionService.class);
      try {
        for (Map.Entry<String, List<String>> pair : linkInfoRecord.entrySet()) {
          for (String suCode : pair.getValue()) {
            FuncSessionUtil.clearCafSession();
            CafSession session = sessionService.create(Integer.parseInt(pair.getKey()),
                "sessionDBCleaner", "zh-C".concat("HS"), new HashMap<>(), SessionType.backend);
            CAFSessionThreadHolder.setCurrentSession(session);
            ICAFContextService contextService = SpringBeanUtils.getBean(ICAFContextService.class);
            contextService.setCurrentSU(suCode);
            try {
              traversalCleanByDB(pair.getKey() + "." + suCode);
            } catch (Throwable t) {
              log.error("定时任务失败:" + pair.getKey() + "." + suCode, t);
            }
          }
        }
      } finally {
        FuncSessionUtil.clearCafSession();
      }
    }

    private static void traversalCleanByDB(String tempKey) {
      SessionIncrementUtil sessionUtil = new SessionIncrementUtil(false);
      if (!sessionUtil.isStorageInited()) {
        return;
      }

      ArrayList<Date> temDates = dates.get(tempKey);
      if (temDates == null) {
        temDates = new ArrayList<>();
        dates.put(tempKey, temDates);
      }
      BizContextManager bizContextManager = SpringBeanUtils.getBean(BizContextManager.class);
      int tableNumber = sessionUtil.getDBTableNumber();
      if (temDates.size() == 0 || temDates.size() != tableNumber) {
        temDates.clear();
        for (int i = 0; i < tableNumber; i++) {
          temDates.add(MinDate);
        }
        temDates.trimToSize();
      }
      List<List<String>> expiredIds = new ArrayList<>();
      List<List<SessionCacheInfo>> infos = sessionUtil.traversalCache(temDates);
      //TODO: 多表遍历逻辑提取到SessionIncrementUtil中
      for (int i = 0; i < temDates.size(); i++) {
        List<String> temExpiredIds = new ArrayList<>();
        for (SessionCacheInfo info : infos.get(i)) {
          String id = info.getSessionId();
          if (bizContextManager.isExpired(id)) {
            temExpiredIds.add(id);
          }
        }
        expiredIds.add(temExpiredIds.isEmpty() ? null : temExpiredIds);
        if (infos.get(i).size() >= 500) {
          temDates.set(i, infos.get(i).get(infos.get(i).size() - 1).getCreatedOn());
        } else {
          temDates.set(i, MinDate);
        }
      }
      if (expiredIds.stream().anyMatch(item -> item != null)) {
        sessionUtil.batchClear(expiredIds);
      }
    }
  }
  //endregion
}
